1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.codec.mqtt.mqttdecoder; 12 13 import std.stdio; 14 import std.array; 15 import std.conv; 16 import std.experimental.allocator; 17 import std.experimental.allocator.gc_allocator; 18 import collie.codec.messagetobyteencoder; 19 import kiss.container.Vector; 20 import collie.codec.mqtt.bytebuf; 21 import collie.codec.mqtt.mqttcodecutil; 22 import collie.codec.mqtt.mqttconnackmessage; 23 import collie.codec.mqtt.mqttconnackvariableheader; 24 import collie.codec.mqtt.mqttconnectmsg; 25 import collie.codec.mqtt.mqttconnectpayload; 26 import collie.codec.mqtt.mqttconnectreturncode; 27 import collie.codec.mqtt.mqttconnectvariableheader; 28 import collie.codec.mqtt.mqttfixedheader; 29 import collie.codec.mqtt.mqttmsg; 30 import collie.codec.mqtt.mqttmsgidvariableheader; 31 import collie.codec.mqtt.mqttmsgtype; 32 import collie.codec.mqtt.mqttpubackmsg; 33 import collie.codec.mqtt.mqttpublishmsg; 34 import collie.codec.mqtt.mqttpublishpayload; 35 import collie.codec.mqtt.mqttpublishvariableheader; 36 import collie.codec.mqtt.mqttqos; 37 import collie.codec.mqtt.mqttsubackmsg; 38 import collie.codec.mqtt.mqttsubackpayload; 39 import collie.codec.mqtt.mqttsubscribemsg; 40 import collie.codec.mqtt.mqttsubscribepayload; 41 import collie.codec.mqtt.mqtttopicsubscription; 42 import collie.codec.mqtt.mqttunsubscribemsg; 43 import collie.codec.mqtt.mqttunsubscribepayload; 44 import collie.codec.mqtt.mqttversion; 45 46 import collie.channel.handler; 47 import collie.codec.bytetomessagedecoder; 48 49 final class Result(T) { 50 51 this(T _value, int _numberOfBytesConsumed) { 52 this.value = _value; 53 this.numberOfBytesConsumed = _numberOfBytesConsumed; 54 } 55 private: 56 T value; 57 int numberOfBytesConsumed; 58 } 59 60 class MqttDecoder :ByteToMessageDecoder!(MqttMsg[]) { 61 62 public: 63 this() { 64 this(DEFAULT_MAX_BYTES_IN_MESSAGE); 65 } 66 67 this(int maxBytesInMessage) { 68 //super(DecoderState.READ_FIXED_HEADER); 69 _curstat = DecoderState.READ_FIXED_HEADER; 70 this.maxBytesInMessage = maxBytesInMessage; 71 } 72 73 override void read(Context ctx, ubyte[] msg) 74 { 75 bool success = true; 76 MqttMsg[] result; 77 success = decode(ctx, msg, result); 78 if (success) 79 { 80 ctx.fireRead(result); 81 } 82 } 83 84 override bool decode(Context ctx, ubyte[] buf, ref MqttMsg[] mqs) { 85 bool res = true; 86 ByteBuf buffer = new ByteBuf(buf); 87 //writeln("new bytebuf readerindex : --> ",buffer.readerIndex()," writeindex : ",buffer.writerIndex()); 88 //writeln(buffer.data); 89 //writeln("new bytebuf len :",buffer.length," buf len : ",buf.length); 90 while(1) 91 { 92 if(_curstat == DecoderState.BAD_MESSAGE || _curstat == DecoderState.DECODE_FINISH) 93 break; 94 switch (_curstat) { 95 case DecoderState.READ_FIXED_HEADER: 96 //writeln("decodeFixedHeader before ------------"); 97 mqttFixedHeader = decodeFixedHeader(buffer); 98 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength(); 99 _curstat = DecoderState.READ_VARIABLE_HEADER; 100 // fall through 101 break; 102 case DecoderState.READ_VARIABLE_HEADER: 103 if (bytesRemainingInVariablePart > maxBytesInMessage) { 104 throw new Exception("too large message: " ~ to!string(bytesRemainingInVariablePart) ~ " bytes"); 105 } 106 //writeln("decodeVariableHeader before ------------"); 107 // Result!(Object) decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader); 108 switch (mqttFixedHeader.messageType()) { 109 case MqttMsgType.CONNECT: 110 auto decodedVariableHeader = decodeConnectionVariableHeader(buffer); 111 variableHeader = decodedVariableHeader.value; 112 bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; 113 break; 114 115 case MqttMsgType.CONNACK: 116 auto decodedVariableHeader = decodeConnAckVariableHeader(buffer); 117 variableHeader = decodedVariableHeader.value; 118 bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; 119 break; 120 121 case MqttMsgType.SUBSCRIBE: 122 case MqttMsgType.UNSUBSCRIBE: 123 case MqttMsgType.SUBACK: 124 case MqttMsgType.UNSUBACK: 125 case MqttMsgType.PUBACK: 126 case MqttMsgType.PUBREC: 127 case MqttMsgType.PUBCOMP: 128 case MqttMsgType.PUBREL: 129 auto decodedVariableHeader = decodeMessageIdVariableHeader(buffer); 130 variableHeader = decodedVariableHeader.value; 131 bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; 132 break; 133 134 case MqttMsgType.PUBLISH: 135 auto decodedVariableHeader = decodePublishVariableHeader(buffer, mqttFixedHeader); 136 variableHeader = decodedVariableHeader.value; 137 bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; 138 break; 139 140 case MqttMsgType.PINGREQ: 141 case MqttMsgType.PINGRESP: 142 case MqttMsgType.DISCONNECT: 143 // Empty variable header 144 145 default: 146 auto decodedVariableHeader = new Result!(Object)(null, 0); 147 variableHeader = decodedVariableHeader.value; 148 bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; 149 break; 150 } 151 152 _curstat = DecoderState.READ_PAYLOAD; 153 154 // fall through 155 break; 156 case DecoderState.READ_PAYLOAD: 157 //writeln("DecoderState.READ_PAYLOAD begin------------"); 158 // Result!(Object) decodedPayload = 159 // decodePayload( 160 // buffer, 161 // mqttFixedHeader.messageType(), 162 // bytesRemainingInVariablePart, 163 // variableHeader); 164 // payload = decodedPayload.value; 165 // bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; 166 167 switch (mqttFixedHeader.messageType()) { 168 case MqttMsgType.CONNECT: 169 auto decodedPayload = decodeConnectionPayload(buffer, cast(MqttConnectVariableHeader) variableHeader); 170 payload = decodedPayload.value; 171 bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; 172 mqs ~= new MqttConnectMsg(mqttFixedHeader, cast(MqttConnectVariableHeader)variableHeader, cast(MqttConnectPayload)payload); 173 break; 174 175 case MqttMsgType.SUBSCRIBE: 176 auto decodedPayload = decodeSubscribePayload(buffer, bytesRemainingInVariablePart); 177 payload = decodedPayload.value; 178 bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; 179 mqs ~= new MqttSubscribeMsg(mqttFixedHeader, cast(MqttMsgIdVariableHeader)variableHeader, cast(MqttSubscribePayload)payload); 180 break; 181 182 case MqttMsgType.SUBACK: 183 auto decodedPayload = decodeSubackPayload(buffer, bytesRemainingInVariablePart); 184 payload = decodedPayload.value; 185 bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; 186 mqs ~= new MqttSubAckMsg(mqttFixedHeader, cast(MqttMsgIdVariableHeader)variableHeader, cast(MqttSubAckPayload)payload); 187 break; 188 189 case MqttMsgType.UNSUBSCRIBE: 190 auto decodedPayload = decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart); 191 payload = decodedPayload.value; 192 bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; 193 mqs ~= new MqttUnsubscribeMsg(mqttFixedHeader, cast(MqttMsgIdVariableHeader)variableHeader, cast(MqttUnsubscribePayload)payload); 194 break; 195 196 case MqttMsgType.PUBLISH: 197 auto decodedPayload = decodePublishPayload(buffer, bytesRemainingInVariablePart); 198 payload = decodedPayload.value; 199 bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; 200 mqs ~= new MqttPublishMsg(mqttFixedHeader, cast(MqttPublishVariableHeader)variableHeader, cast(MqttPublishPayload)payload); 201 break; 202 case MqttMsgType.CONNACK: 203 mqs ~= new MqttConnAckMessage(mqttFixedHeader, cast(MqttConnAckVariableHeader)variableHeader); 204 break; 205 default: 206 // unknown payload , no byte consumed 207 auto decodedPayload = new Result!(Object)(null, 0); 208 payload = decodedPayload.value; 209 bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; 210 mqs ~= new MqttMsg(mqttFixedHeader, variableHeader, payload); 211 break; 212 213 } 214 215 if (bytesRemainingInVariablePart != 0) { 216 throw new Exception( 217 "non-zero remaining payload bytes: " ~ 218 to!string(bytesRemainingInVariablePart) ~ " (" ~ to!string(mqttFixedHeader.messageType()) ~ ')'); 219 } 220 221 mqttFixedHeader = null; 222 variableHeader = null; 223 payload = null; 224 225 //writeln(" bytebuf readerindex : --> ",buffer.readerIndex()," writeindex : ",buffer.writerIndex()," len : ",buffer.length); 226 if(buffer.readerIndex == buffer.length) //解码完毕 227 _curstat = DecoderState.DECODE_FINISH; 228 else 229 _curstat = DecoderState.READ_FIXED_HEADER; //解析下条消息 230 231 break; 232 233 case DecoderState.BAD_MESSAGE: 234 // Keep discarding until disconnection. 235 buffer.skipBytes(cast(int)buffer.length); 236 break; 237 238 default: 239 // Shouldn't reach here. 240 throw new Exception("decoder error"); 241 } 242 } 243 244 _curstat = DecoderState.READ_FIXED_HEADER; 245 246 return res; 247 } 248 249 private: 250 // MqttMsg invalidMessage(Throwable cause) { 251 // checkpoint(DecoderState.BAD_MESSAGE); 252 // return MqttMessageFactory.newInvalidMessage(cause); 253 // } 254 255 /** 256 * Decodes the fixed header. It's one byte for the flags and then variable bytes for the remaining length. 257 * 258 * @param buffer the buffer to decode from 259 * @return the fixed header 260 */ 261 static MqttFixedHeader decodeFixedHeader(ref ByteBuf buffer) { 262 short b1 = buffer.readUnsignedByte(); 263 264 MqttMsgType messageType = to!MqttMsgType(b1 >> 4); 265 bool dupFlag = (b1 & 0x08) == 0x08; 266 int qosLevel = (b1 & 0x06) >> 1; 267 bool retain = (b1 & 0x01) != 0; 268 269 int remainingLength = 0; 270 int multiplier = 1; 271 short digit; 272 int loops = 0; 273 do { 274 digit = buffer.readUnsignedByte(); 275 remainingLength += (digit & 127) * multiplier; 276 multiplier *= 128; 277 loops++; 278 } while ((digit & 128) != 0 && loops < 4); 279 280 // MQTT protocol limits Remaining Length to 4 bytes 281 if (loops == 4 && (digit & 128) != 0) { 282 throw new Exception("remaining length exceeds 4 digits (" ~ to!string(messageType) ~ ')'); 283 } 284 MqttFixedHeader decodedFixedHeader = 285 new MqttFixedHeader(messageType, dupFlag, to!(MqttQoS)(qosLevel), retain, remainingLength); 286 return MqttCodecUtil.validateFixedHeader(MqttCodecUtil.resetUnusedFields(decodedFixedHeader)); 287 } 288 289 /** 290 * Decodes the variable header (if any) 291 * @param buffer the buffer to decode from 292 * @param mqttFixedHeader MqttFixedHeader of the same message 293 * @return the variable header 294 */ 295 static Result!(Object) decodeVariableHeader(ref ByteBuf buffer, MqttFixedHeader mqttFixedHeader) { 296 switch (mqttFixedHeader.messageType()) { 297 case MqttMsgType.CONNECT: 298 return cast(Result!(Object))(decodeConnectionVariableHeader(buffer)); 299 300 case MqttMsgType.CONNACK: 301 return to!(Result!(Object))(decodeConnAckVariableHeader(buffer)); 302 303 case MqttMsgType.SUBSCRIBE: 304 case MqttMsgType.UNSUBSCRIBE: 305 case MqttMsgType.SUBACK: 306 case MqttMsgType.UNSUBACK: 307 case MqttMsgType.PUBACK: 308 case MqttMsgType.PUBREC: 309 case MqttMsgType.PUBCOMP: 310 case MqttMsgType.PUBREL: 311 return to!(Result!(Object))(decodeMessageIdVariableHeader(buffer)); 312 313 case MqttMsgType.PUBLISH: 314 return to!(Result!(Object))(decodePublishVariableHeader(buffer, mqttFixedHeader)); 315 316 case MqttMsgType.PINGREQ: 317 case MqttMsgType.PINGRESP: 318 case MqttMsgType.DISCONNECT: 319 // Empty variable header 320 return new Result!(Object)(null, 0); 321 default: 322 return new Result!(Object)(null, 0); 323 } 324 325 } 326 327 static Result!(MqttConnectVariableHeader) decodeConnectionVariableHeader(ref ByteBuf buffer) { 328 Result!(string) protoString = decodeString(buffer); 329 int numberOfBytesConsumed = protoString.numberOfBytesConsumed; 330 331 byte protocolLevel = buffer.readByte(); 332 numberOfBytesConsumed += 1; 333 //writeln("decoder ---> ",to!string(protoString.value), protocolLevel); 334 MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(cast(string)protoString.value, protocolLevel); 335 336 int b1 = buffer.readUnsignedByte(); 337 numberOfBytesConsumed += 1; 338 339 Result!(int) keepAlive = decodeMsbLsb(buffer); 340 numberOfBytesConsumed += keepAlive.numberOfBytesConsumed; 341 342 bool hasUserName = (b1 & 0x80) == 0x80; 343 bool hasPassword = (b1 & 0x40) == 0x40; 344 bool willRetain = (b1 & 0x20) == 0x20; 345 int willQos = (b1 & 0x18) >> 3; 346 bool willFlag = (b1 & 0x04) == 0x04; 347 bool cleanSession = (b1 & 0x02) == 0x02; 348 if (mqttVersion.protocolName() == "MQTT") { 349 bool zeroReservedFlag = (b1 & 0x01) == 0x0; 350 if (!zeroReservedFlag) { 351 // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is 352 // set to zero and disconnect the Client if it is not zero. 353 // See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230 354 throw new Exception("non-zero reserved flag"); 355 } 356 } 357 358 MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader( 359 mqttVersion.protocolName(), 360 mqttVersion.protocolLevel(), 361 hasUserName, 362 hasPassword, 363 willRetain, 364 willQos, 365 willFlag, 366 cleanSession, 367 keepAlive.value); 368 return new Result!(MqttConnectVariableHeader)(mqttConnectVariableHeader, numberOfBytesConsumed); 369 } 370 371 static Result!(MqttConnAckVariableHeader) decodeConnAckVariableHeader(ref ByteBuf buffer) { 372 bool sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01; 373 byte returnCode = cast(byte)buffer.readByte(); 374 int numberOfBytesConsumed = 2; 375 MqttConnAckVariableHeader mqttConnAckVariableHeader = 376 new MqttConnAckVariableHeader(to!(MqttConnectReturnCode)(returnCode), sessionPresent); 377 return new Result!(MqttConnAckVariableHeader)(mqttConnAckVariableHeader, numberOfBytesConsumed); 378 } 379 380 static Result!(MqttMsgIdVariableHeader) decodeMessageIdVariableHeader(ref ByteBuf buffer) { 381 Result!(int) messageId = decodeMessageId(buffer); 382 return new Result!(MqttMsgIdVariableHeader)( 383 to!MqttMsgIdVariableHeader(MqttMsgIdVariableHeader.from(messageId.value)), 384 messageId.numberOfBytesConsumed); 385 } 386 387 static Result!(MqttPublishVariableHeader) decodePublishVariableHeader( 388 ref ByteBuf buffer, 389 MqttFixedHeader mqttFixedHeader) { 390 Result!(string) decodedTopic = decodeString(buffer); 391 if (!MqttCodecUtil.isValidPublishTopicName(decodedTopic.value)) { 392 throw new Exception("invalid publish topic name: " ~ decodedTopic.value ~ " (contains wildcards)"); 393 } 394 int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed; 395 396 int messageId = -1; 397 if (to!int(mqttFixedHeader.qosLevel()) > 0) { 398 Result!(int) decodedMessageId = decodeMessageId(buffer); 399 messageId = decodedMessageId.value; 400 numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed; 401 } 402 MqttPublishVariableHeader mqttPublishVariableHeader = 403 new MqttPublishVariableHeader(decodedTopic.value, messageId); 404 return new Result!(MqttPublishVariableHeader)(mqttPublishVariableHeader, numberOfBytesConsumed); 405 } 406 407 static Result!(int) decodeMessageId(ref ByteBuf buffer) { 408 Result!(int) messageId = decodeMsbLsb(buffer); 409 if (!MqttCodecUtil.isValidMessageId(messageId.value)) { 410 throw new Exception("invalid messageId: " ~ to!string(messageId.value)); 411 } 412 return messageId; 413 } 414 415 /** 416 * Decodes the payload. 417 * 418 * @param buffer the buffer to decode from 419 * @param messageType type of the message being decoded 420 * @param bytesRemainingInVariablePart bytes remaining 421 * @param variableHeader variable header of the same message 422 * @return the payload 423 */ 424 static Result!(Object) decodePayload( 425 ref ByteBuf buffer, 426 MqttMsgType messageType, 427 int bytesRemainingInVariablePart, 428 Object variableHeader) { 429 switch (messageType) { 430 case MqttMsgType.CONNECT: 431 return to!(Result!(Object))(decodeConnectionPayload(buffer, cast(MqttConnectVariableHeader) variableHeader)); 432 433 case MqttMsgType.SUBSCRIBE: 434 return to!(Result!(Object))(decodeSubscribePayload(buffer, bytesRemainingInVariablePart)); 435 436 case MqttMsgType.SUBACK: 437 return to!(Result!(Object))(decodeSubackPayload(buffer, bytesRemainingInVariablePart)); 438 439 case MqttMsgType.UNSUBSCRIBE: 440 return to!(Result!(Object))(decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart)); 441 442 case MqttMsgType.PUBLISH: 443 return to!(Result!(Object))(decodePublishPayload(buffer, bytesRemainingInVariablePart)); 444 445 default: 446 // unknown payload , no byte consumed 447 return new Result!(Object)(null, 0); 448 } 449 } 450 451 static Result!(MqttConnectPayload) decodeConnectionPayload( 452 ref ByteBuf buffer, 453 MqttConnectVariableHeader mqttConnectVariableHeader) { 454 //writeln("decodeConnectionPayload ---begin----"); 455 Result!(string) decodedClientId = decodeString(buffer); 456 457 string decodedClientIdValue = decodedClientId.value; 458 459 MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(), 460 to!byte(mqttConnectVariableHeader.mqtt_version())); 461 if (!MqttCodecUtil.isValidClientId(mqttVersion, decodedClientIdValue)) { 462 throw new Exception("invalid clientIdentifier: " ~ decodedClientIdValue); 463 } 464 int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed; 465 466 Result!(string) decodedWillTopic = null; 467 Result!(string) decodedWillMessage = null; 468 if (mqttConnectVariableHeader.isWillFlag()) { 469 470 decodedWillTopic = decodeString(buffer, 0, 32767); 471 //writeln("decodeConnectionPayload ---end----"); 472 numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed; 473 decodedWillMessage = decodeAsciiString(buffer); 474 numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed; 475 } 476 Result!(string) decodedUserName = null; 477 Result!(string) decodedPassword = null; 478 if (mqttConnectVariableHeader.hasUserName()) { 479 decodedUserName = decodeString(buffer); 480 numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed; 481 } 482 if (mqttConnectVariableHeader.hasPassword()) { 483 decodedPassword = decodeString(buffer); 484 numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed; 485 } 486 487 MqttConnectPayload mqttConnectPayload = 488 new MqttConnectPayload( 489 decodedClientId.value, 490 decodedWillTopic !is null ? decodedWillTopic.value : null, 491 decodedWillMessage !is null ? decodedWillMessage.value : null, 492 decodedUserName !is null ? decodedUserName.value : null, 493 decodedPassword !is null ? decodedPassword.value : null); 494 return new Result!(MqttConnectPayload)(mqttConnectPayload, numberOfBytesConsumed); 495 } 496 497 static Result!(MqttSubscribePayload) decodeSubscribePayload( 498 ref ByteBuf buffer, 499 int bytesRemainingInVariablePart) { 500 MqttTopicSubscription[] subscribeTopics ; 501 int numberOfBytesConsumed = 0; 502 while (numberOfBytesConsumed < bytesRemainingInVariablePart) { 503 Result!(string) decodedTopicName = decodeString(buffer); 504 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed; 505 int qos = buffer.readUnsignedByte() & 0x03; 506 numberOfBytesConsumed++; 507 subscribeTopics ~= new MqttTopicSubscription(decodedTopicName.value, to!MqttQoS(qos)); 508 } 509 return new Result!(MqttSubscribePayload)(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed); 510 } 511 512 static Result!(MqttSubAckPayload) decodeSubackPayload( 513 ref ByteBuf buffer, 514 int bytesRemainingInVariablePart) { 515 int[] grantedQos ; 516 int numberOfBytesConsumed = 0; 517 while (numberOfBytesConsumed < bytesRemainingInVariablePart) { 518 int qos = buffer.readUnsignedByte() & 0x03; 519 numberOfBytesConsumed++; 520 grantedQos ~= qos; 521 } 522 return new Result!(MqttSubAckPayload)(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed); 523 } 524 525 static Result!(MqttUnsubscribePayload) decodeUnsubscribePayload( 526 ref ByteBuf buffer, 527 int bytesRemainingInVariablePart) { 528 string[] unsubscribeTopics; 529 int numberOfBytesConsumed = 0; 530 while (numberOfBytesConsumed < bytesRemainingInVariablePart) { 531 Result!(string) decodedTopicName = decodeString(buffer); 532 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed; 533 unsubscribeTopics ~= decodedTopicName.value; 534 } 535 return new Result!(MqttUnsubscribePayload)( 536 new MqttUnsubscribePayload(unsubscribeTopics), 537 numberOfBytesConsumed); 538 } 539 540 static Result!(MqttPublishPayload) decodePublishPayload(ref ByteBuf buffer, int bytesRemainingInVariablePart) { 541 ByteBuf b = buffer.readSlice(bytesRemainingInVariablePart); 542 return new Result!(MqttPublishPayload)(new MqttPublishPayload(b.data()), bytesRemainingInVariablePart); 543 } 544 545 static Result!(string) decodeString(ref ByteBuf buffer) { 546 return decodeString(buffer, 0, int.max); 547 } 548 549 static Result!(string) decodeAsciiString(ref ByteBuf buffer) { 550 Result!(string) result = decodeString(buffer, 0, int.max); 551 string s = result.value; 552 for (int i = 0; i < s.length; i++) { 553 if (s[i] > 127) { 554 return new Result!(string)(null, result.numberOfBytesConsumed); 555 } 556 } 557 return new Result!(string)(s, result.numberOfBytesConsumed); 558 } 559 560 static Result!(string) decodeString(ref ByteBuf buffer, int minBytes, int maxBytes) { 561 Result!(int) decodedSize = decodeMsbLsb(buffer); 562 int size = decodedSize.value; 563 int numberOfBytesConsumed = decodedSize.numberOfBytesConsumed; 564 if (size < minBytes || size > maxBytes) { 565 buffer.skipBytes(size); 566 numberOfBytesConsumed += size; 567 return new Result!(string)(null, numberOfBytesConsumed); 568 } 569 string s = buffer.toString(buffer.readerIndex(), size); 570 buffer.skipBytes(size); 571 numberOfBytesConsumed += size; 572 return new Result!(string)(s, numberOfBytesConsumed); 573 } 574 575 static Result!(int) decodeMsbLsb(ref ByteBuf buffer) { 576 return decodeMsbLsb(buffer, 0, 65535); 577 } 578 579 static Result!(int) decodeMsbLsb(ref ByteBuf buffer, int min, int max) { 580 short msbSize = buffer.readUnsignedByte(); 581 short lsbSize = buffer.readUnsignedByte(); 582 int numberOfBytesConsumed = 2; 583 584 int result = msbSize << 8 | lsbSize; 585 if (result < min || result > max) { 586 result = -1; 587 } 588 //writeln("msbSize : ",msbSize, " lsbSize : ",lsbSize, " result : ",result); 589 return new Result!(int)(result, numberOfBytesConsumed); 590 } 591 592 public: 593 // MqttMsgType getMsgType() 594 // { 595 // if(mqttFixedHeader !is null) 596 // return mqttFixedHeader.messageType(); 597 // return MqttMsgType.UNKNOWN; 598 // } 599 600 private: 601 static int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092; 602 603 /** 604 * States of the decoder. 605 * We start at READ_FIXED_HEADER, followed by 606 * READ_VARIABLE_HEADER and finally READ_PAYLOAD. 607 */ 608 enum DecoderState { 609 READ_FIXED_HEADER, 610 READ_VARIABLE_HEADER, 611 READ_PAYLOAD, 612 BAD_MESSAGE, 613 DECODE_FINISH, 614 } 615 DecoderState _curstat; 616 MqttFixedHeader mqttFixedHeader; 617 Object variableHeader; 618 Object payload; 619 int bytesRemainingInVariablePart; 620 621 int maxBytesInMessage; 622 } 623